iT邦幫忙

2022 iThome 鐵人賽

DAY 17
0
自我挑戰組

Spring In Action系列 第 17

Components of integration flow

  • 分享至 

  • xImage
  •  

這段在介紹所有Spring Integration Flow能夠使用的components。

  • Message channels

訊息傳遞的通道

  • Filters

過濾訊息,保留我們想留下來的訊息

@Filter(inputChannel="stringChannel",
        outputChannel="containSChannel")
public boolean stringContainSFilter(String str) {
  return str.contains("s");
}

或使用DSL

@Bean
public IntegrationFlow stringContainSFlow(String str) {
  return IntegrationFlows
      ...
      .<String>filter((s) -> s.contains("s"))
      ...
      .get();
}
  • Transformers

如同steam的map(Function<T,U> function),將傳入的T類別轉換為U類別

@Bean
@Transformer(inputChannel="numberChannel",
             outputChannel="romanNumberChannel")
public GenericTransformer<Integer, String> romanNumTransformer() {
  return RomanNumbers::toRoman;
}

DSL

@Bean
public IntegrationFlow transformerFlow() {
  return IntegrationFlows
      ...
      .transform(RomanNumbers::toRoman)
      ...
      .get();
}
  • Routers

將訊息根據條件分派至不同的channels

@Bean
@Router(inputChannel="numberChannel")
public AbstractMessageRouter evenOddRouter() {
  return new AbstractMessageRouter() {
    @Override
    protected Collection<MessageChannel>
              determineTargetChannels(Message<?> message) {
      Integer number = (Integer) message.getPayload();
      if (number % 2 == 0) {
        return Collections.singleton(evenChannel());
      }
      return Collections.singleton(oddChannel());
    }
  };
}
 
@Bean
public MessageChannel evenChannel() {
  return new DirectChannel();
}
 
@Bean
public MessageChannel oddChannel() {
  return new DirectChannel();
}

DSL

@Bean
public IntegrationFlow numberRoutingFlow(AtomicInteger source) {
  return IntegrationFlows
    ...
      .<Integer, String>route(n -> n%2==0 ? "EVEN":"ODD", mapping -> mapping
        .subFlowMapping("EVEN", sf -> sf
            .<Integer, Integer>transform(n -> n * 10)
            .handle((i,h) -> { ... })
            )
        .subFlowMapping("ODD", sf -> sf
            .transform(RomanNumbers::toRoman)
            .handle((i,h) -> { ... })
            )
        )
      .get();
}
  • Splitters

將一個訊息分拆成多個訊息後,分送至不同channel

public class OrderSplitter {

	@Bean
	@Splitter(inputChannel="poChannel",
          outputChannel="splitOrderChannel")
	public OrderSplitter orderSplitter() {
	  return new OrderSplitter();
	}

  public Collection<Object> splitOrderIntoParts(PurchaseOrder po) {
    ArrayList<Object> parts = new ArrayList<>();
    parts.add(po.getBillingInfo());
    parts.add(po.getLineItems());
    return parts;
  }

}

以上會將一個order分拆,並送至splitOrderChannel,而後可以再使用以下的Router,把分拆後的splitOrderChannel中的訊息route至其他的channel:

@Bean
@Router(inputChannel="splitOrderChannel")
public MessageRouter splitOrderRouter() {
  PayloadTypeRouter router = new PayloadTypeRouter();
  router.setChannelMapping(
      BillingInfo.class.getName(), "billingInfoChannel");
  router.setChannelMapping(
      List.class.getName(), "lineItemsChannel");
  return router;
}
@Splitter(inputChannel="lineItemsChannel", outputChannel="lineItemChannel")
public List<LineItem> lineItemSplitter(List<LineItem> lineItems) {
  return lineItems;
}

DSL

return IntegrationFlows
  ...
    .split(orderSplitter())
    .<Object, String> route(
        p -> {
          if (p.getClass().isAssignableFrom(BillingInfo.class)) {
            return "BILLING_INFO";
          } else {
            return "LINE_ITEMS";
          }
        }, mapping -> mapping
          .subFlowMapping("BILLING_INFO", sf -> sf
              .<BillingInfo> handle((billingInfo, h) -> {
                ...
              }))
          .subFlowMapping("LINE_ITEMS", sf -> sf
              .split()
              .<LineItem> handle((lineItem, h) -> {
                ...
              }))
 
        )
    .get();

以上若覺得都擠成一團,也可如下分拆成個別方法再呼叫:

private String route(Object p) {
  return p.getClass().isAssignableFrom(BillingInfo.class)
      ? "BILLING_INFO"
      : "LINE_ITEMS";
}
 
private BillingInfo handleBillingInfo(
        BillingInfo billingInfo, MessageHeaders h) {
  // ...
}
 
private LineItem handleLineItems(
        LineItem lineItem, MessageHeaders h) {
  // ...
}
return IntegrationFlows
  ...
    .split()
    .route(
      this::route,
      mapping -> mapping
        .subFlowMapping("BILLING_INFO", sf -> sf
          .<BillingInfo> handle(this::handleBillingInfo))
        .subFlowMapping("LINE_ITEMS", sf -> sf
          .split()
          .<LineItem> handle(this::handleLineItems)));
  • Service activators

可以當訊息在channel中時,activate其他的流程

@Bean
@ServiceActivator(inputChannel="someChannel")
public MessageHandler sysoutHandler() {
  return message -> {
    System.out.println("Message payload:  " + message.getPayload());
  };
}
@Bean
@ServiceActivator(inputChannel="orderChannel",
                  outputChannel="completeChannel")
public GenericHandler<EmailOrder> orderHandler(OrderRepository orderRepo) {
  return (payload, headers) -> {
    return orderRepo.save(payload);
  };
}

DSL

public IntegrationFlow someFlow() {
  return IntegrationFlows
    ...
      .handle(msg -> {
        System.out.println("Message payload:  " + msg.getPayload());
       })
      .get();
}
  • Gateway

App與Integration flow的介面:

package sia6;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.stereotype.Component;
 
@Component
@MessagingGateway(defaultRequestChannel="inChannel",
                  defaultReplyChannel="outChannel")
public interface UpperCaseGateway {
  String uppercase(String in);
}

DSL

@Bean
public IntegrationFlow uppercaseFlow() {
  return IntegrationFlows
    .from("inChannel")
    .<String, String> transform(s -> s.toUpperCase())
    .channel("outChannel")
    .get();
}
  • Channel adapters

訊息在integration flow的進入點與出口點

@Bean
@InboundChannelAdapter(poller=@Poller(fixedRate="1000"), channel="numberChannel")
public MessageSource<Integer> numberSource(AtomicInteger source) {
  return () -> {
    return new GenericMessage<>(source.getAndIncrement());
  };
}

DSL

@Bean
public IntegrationFlow someFlow(AtomicInteger integerSource) {
  return IntegrationFlows
      .from(integerSource, "getAndIncrement",
          c -> c.poller(Pollers.fixedRate(1000)))
    ...
      .get();
}

Spring提供了很多已經實作好的endpoint module可直接作為channel adapter使用:

@Bean
@InboundChannelAdapter(channel="file-channel",poller=@Poller(fixedDelay="1000"))
public MessageSource<File> fileReadingMessageSource() {
  FileReadingMessageSource sourceReader = new FileReadingMessageSource();
  sourceReader.setDirectory(new File(INPUT_DIR));
  sourceReader.setFilter(new SimplePatternFileListFilter(FILE_PATTERN));
  return sourceReader;
}

DSL

@Bean
public IntegrationFlow fileReaderFlow() {
  return IntegrationFlows
      .from(Files.inboundAdapter(new File(INPUT_DIR))
          .patternFilter(FILE_PATTERN))
      .get();
}

而Spring Integration有支援的endpoints可以在以下文件中查看:

[https://docs.spring.io/spring-integration/reference/html/endpoint-summary.html]


上一篇
Intro of integration flow
下一篇
Create an email integration flow
系列文
Spring In Action30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言